package indi.mozping.protobufkafka;

import indi.mozping.protobufkafka.bean.User;
import indi.mozping.protobufkafka.inter.ProtobufSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @author by mozping
 * @Classname MyKafkaProducer
 * @Description TODO
 * @Date 2019/7/20 9:38
 */
public class MyKafkaProducer {

    private static Properties props = new Properties();

    static {
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.BROKER_LIST);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ProtobufSerializer.class.getName());
    }

    public static void main(String[] args) {
        Producer<String, User> producer = new KafkaProducer<>(props);

        User user = new User(101L, "kafka", "serializer@kafka.com", 1);
        ProducerRecord<String, User> record = new ProducerRecord<String, User>(Config.TOPIC_NAME, Long.toString(user.getId()), user);
        for (int i = 0; i < 10; i++) {
            producer.send(record);
        }
        producer.close();
    }

}